/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.cassandra.utils.bytecomparable;

import static com.google.common.base.Preconditions.checkArgument;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import org.apache.cassandra.utils.bytecomparable.ByteComparable.Version;
import org.apache.cassandra.utils.memory.MemoryUtil;

/**
 * A stream of bytes, used for byte-order-comparable representations of data, and utilities to
 * convert various values to their byte-ordered translation. See ByteComparable.md for details about
 * the encoding scheme.
 */
public interface ByteSource {
  /**
   * Consume the next byte, unsigned. Must be between 0 and 255, or END_OF_STREAM if there are no
   * more bytes.
   */
  int next();

  /** Value returned if at the end of the stream. */
  int END_OF_STREAM = -1;

  ByteSource EMPTY = () -> END_OF_STREAM;

  /**
   * Escape value. Used, among other things, to mark the end of subcomponents (so that shorter
   * compares before anything longer). Actual zeros in input need to be escaped if this is in use
   * (see {@link AbstractEscaper}).
   */
  int ESCAPE = 0x00;

  // Zeros are encoded as a sequence of ESCAPE, 0 or more of ESCAPED_0_CONT, ESCAPED_0_DONE so
  // zeroed spaces only grow by 1 byte
  int ESCAPED_0_CONT = 0xFE;
  int ESCAPED_0_DONE = 0xFF;

  // All separators must be within these bounds
  int MIN_SEPARATOR = 0x10;
  int MAX_SEPARATOR = 0xEF;

  // Next component marker.
  int NEXT_COMPONENT = 0x40;
  // Marker used to present null values represented by empty buffers (e.g. by Int32Type)
  int NEXT_COMPONENT_EMPTY = 0x3F;
  int NEXT_COMPONENT_EMPTY_REVERSED = 0x41;
  // Marker for null components in tuples, maps, sets and clustering keys.
  int NEXT_COMPONENT_NULL = 0x3E;

  // Section for next component markers which is not allowed for use
  int MIN_NEXT_COMPONENT = 0x3C;
  int MAX_NEXT_COMPONENT = 0x44;

  // Default terminator byte in sequences. Smaller than NEXT_COMPONENT_NULL, but larger than
  // LT_NEXT_COMPONENT to
  // ensure lexicographic compares go in the correct direction
  int TERMINATOR = 0x38;
  // These are special endings, for exclusive/inclusive bounds (i.e. smaller than anything with more
  // components,
  // bigger than anything with more components)
  int LT_NEXT_COMPONENT = 0x20;
  int GT_NEXT_COMPONENT = 0x60;

  // Special value for components that should be excluded from the normal min/max span. (static
  // rows)
  int EXCLUDED = 0x18;

  /**
   * Encodes a byte buffer as a byte-comparable source that has 0s escaped and finishes in an
   * escape. This provides a weakly-prefix-free byte-comparable version of the content to use in
   * sequences. (See ByteSource.BufferEscaper/Multi for explanation.)
   */
  static ByteSource of(ByteBuffer buf, Version version) {
    return new BufferEscaper(buf, version);
  }

  /**
   * Encodes a byte array as a byte-comparable source that has 0s escaped and finishes in an escape.
   * This provides a prefix-free byte-comparable version of the content to use in sequences. (See
   * ByteSource.BufferEscaper/Multi for explanation.)
   */
  static ByteSource of(byte[] buf, Version version) {
    return new ArrayEscaper(buf, version);
  }

  /**
   * Encodes a memory range as a byte-comparable source that has 0s escaped and finishes in an
   * escape. This provides a weakly-prefix-free byte-comparable version of the content to use in
   * sequences. (See ByteSource.BufferEscaper/Multi for explanation.)
   */
  static ByteSource ofMemory(long address, int length, Version version) {
    return new MemoryEscaper(address, length, version);
  }

  /**
   * Combines a chain of sources, turning their weak-prefix-free byte-comparable representation into
   * the combination's prefix-free byte-comparable representation, with the included terminator
   * character. For correctness, the terminator must be within MIN-MAX_SEPARATOR and outside the
   * range reserved for NEXT_COMPONENT markers. Typically TERMINATOR, or LT/GT_NEXT_COMPONENT if
   * used for partially specified bounds.
   */
  static ByteSource withTerminator(int terminator, ByteSource... srcs) {
    assert terminator >= MIN_SEPARATOR && terminator <= MAX_SEPARATOR;
    assert terminator < MIN_NEXT_COMPONENT || terminator > MAX_NEXT_COMPONENT;
    return new Multi(srcs, terminator);
  }

  /**
   * As above, but permits any separator. The legacy format wasn't using weak prefix freedom and has
   * some non-reversible transformations.
   */
  static ByteSource withTerminatorLegacy(int terminator, ByteSource... srcs) {
    return new Multi(srcs, terminator);
  }

  static ByteSource withTerminatorMaybeLegacy(
      Version version, int legacyTerminator, ByteSource... srcs) {
    return version == Version.LEGACY
        ? withTerminatorLegacy(legacyTerminator, srcs)
        : withTerminator(TERMINATOR, srcs);
  }

  static ByteSource of(String s, Version version) {
    return new ArrayEscaper(s.getBytes(StandardCharsets.UTF_8), version);
  }

  static ByteSource of(long value) {
    return new Number(value ^ (1L << 63), 8);
  }

  static ByteSource of(int value) {
    return new Number(value ^ (1L << 31), 4);
  }

  /**
   * Produce a source for a signed integer, stored using variable length encoding. The
   * representation uses between 1 and 9 bytes, is prefix-free and compares correctly.
   */
  static ByteSource variableLengthInteger(long value) {
    return new VariableLengthInteger(value);
  }

  /**
   * Returns a separator for two byte sources, i.e. something that is definitely > prevMax, and <=
   * currMin, assuming prevMax < currMin. This returns the shortest prefix of currMin that is
   * greater than prevMax.
   */
  public static ByteSource separatorPrefix(ByteSource prevMax, ByteSource currMin) {
    return new Separator(prevMax, currMin, true);
  }

  /**
   * Returns a separator for two byte sources, i.e. something that is definitely > prevMax, and <=
   * currMin, assuming prevMax < currMin. This is a source of length 1 longer than the common prefix
   * of the two sources, with last byte one higher than the prevMax source.
   */
  public static ByteSource separatorGt(ByteSource prevMax, ByteSource currMin) {
    return new Separator(prevMax, currMin, false);
  }

  public static ByteSource oneByte(int i) {
    assert i >= 0 && i <= 0xFF : "Argument must be a valid unsigned byte.";
    return new ByteSource() {
      boolean consumed = false;

      @Override
      public int next() {
        if (consumed) return END_OF_STREAM;
        consumed = true;
        return i;
      }
    };
  }

  public static ByteSource cut(ByteSource src, int cutoff) {
    return new ByteSource() {
      int pos = 0;

      @Override
      public int next() {
        return pos++ < cutoff ? src.next() : END_OF_STREAM;
      }
    };
  }

  /**
   * Wrap a ByteSource in a length-fixing facade.
   *
   * <p>If the length of {@code src} is less than {@code cutoff}, then pad it on the right with
   * {@code padding} until the overall length equals {@code cutoff}. If the length of {@code src} is
   * greater than {@code cutoff}, then truncate {@code src} to that size. Effectively a noop if
   * {@code src} happens to have length {@code cutoff}.
   *
   * @param src the input source to wrap
   * @param cutoff the size of the source returned
   * @param padding a padding byte (an int subject to a 0xFF mask)
   */
  public static ByteSource cutOrRightPad(ByteSource src, int cutoff, int padding) {
    return new ByteSource() {
      int pos = 0;

      @Override
      public int next() {
        if (pos++ >= cutoff) {
          return END_OF_STREAM;
        }
        int next = src.next();
        return next == END_OF_STREAM ? padding : next;
      }
    };
  }

  /**
   * Variable-length encoding. Escapes 0s as ESCAPE + zero or more ESCAPED_0_CONT + ESCAPED_0_DONE.
   * If the source ends in 0, we use ESCAPED_0_CONT to make sure that the encoding remains smaller
   * than that source with a further 0 at the end. Finishes in an escaped state (either with ESCAPE
   * or ESCAPED_0_CONT), which in {@link Multi} is followed by a component separator between 0x10
   * and 0xFE.
   *
   * <p>E.g. "A\0\0B" translates to 4100FEFF4200 "A\0B\0" 4100FF4200FE (+00 for {@link
   * Version#LEGACY}) "A\0" 4100FE (+00 for {@link Version#LEGACY}) "AB" 414200
   *
   * <p>If in a single byte source, the bytes could be simply passed unchanged, but this would not
   * allow us to combine components. This translation preserves order, and since the encoding for 0
   * is higher than the separator also makes sure shorter components are treated as smaller.
   *
   * <p>The encoding is not prefix-free, since e.g. the encoding of "A" (4100) is a prefix of the
   * encoding of "A\0" (4100FE), but the byte following the prefix is guaranteed to be FE or FF,
   * which makes the encoding weakly prefix-free. Additionally, any such prefix sequence will
   * compare smaller than the value to which it is a prefix, because any permitted separator byte
   * will be smaller than the byte following the prefix.
   */
  abstract static class AbstractEscaper implements ByteSource {
    private final Version version;
    private int bufpos;
    private boolean escaped;

    AbstractEscaper(int position, Version version) {
      this.bufpos = position;
      this.version = version;
    }

    @Override
    public final int next() {
      if (bufpos >= limit()) {
        if (bufpos > limit()) return END_OF_STREAM;

        ++bufpos;
        if (escaped) {
          escaped = false;
          if (version == Version.LEGACY)
            --bufpos; // place an ESCAPE at the end of sequence ending in ESCAPE
          return ESCAPED_0_CONT;
        }
        return ESCAPE;
      }

      int index = bufpos++;
      int b = get(index) & 0xFF;
      if (!escaped) {
        if (b == ESCAPE) escaped = true;
        return b;
      } else {
        if (b == ESCAPE) return ESCAPED_0_CONT;
        --bufpos;
        escaped = false;
        return ESCAPED_0_DONE;
      }
    }

    protected abstract byte get(int index);

    protected abstract int limit();
  }

  static class BufferEscaper extends AbstractEscaper {
    private final ByteBuffer buf;

    private BufferEscaper(ByteBuffer buf, Version version) {
      super(buf.position(), version);
      this.buf = buf;
    }

    protected int limit() {
      return buf.limit();
    }

    protected byte get(int index) {
      return buf.get(index);
    }
  }

  static class ArrayEscaper extends AbstractEscaper {
    private final byte[] buf;

    private ArrayEscaper(byte[] buf, Version version) {
      super(0, version);
      this.buf = buf;
    }

    @Override
    protected byte get(int index) {
      return buf[index];
    }

    @Override
    protected int limit() {
      return buf.length;
    }
  }

  static class MemoryEscaper extends AbstractEscaper {
    private final long address;
    private final int length;

    MemoryEscaper(long address, int length, Version version) {
      super(0, version);
      this.address = address;
      this.length = length;
    }

    protected byte get(int index) {
      return MemoryUtil.getByte(address + index);
    }

    protected int limit() {
      return length;
    }
  }

  /**
   * Variable-length encoding for unsigned integers. The encoding is similar to UTF-8 encoding.
   * Numbers between 0 and 127 are encoded in one byte, using 0 in the most significant bit. Larger
   * values have 1s in as many of the most significant bits as the number of additional bytes in the
   * representation, followed by a 0. This ensures that longer numbers compare larger than shorter
   * ones. Since we never use a longer representation than necessary, this implies numbers compare
   * correctly. As the number of bytes is specified in the bits of the first, no value is a prefix
   * of another.
   */
  static class VariableLengthUnsignedInteger implements ByteSource {
    private final long value;
    private int pos = -1;

    public VariableLengthUnsignedInteger(long value) {
      this.value = value;
    }

    @Override
    public int next() {
      if (pos == -1) {
        int bitsMinusOne =
            63
                - (Long.numberOfLeadingZeros(
                    value | 1)); // 0 to 63 (the | 1 is to make sure 0 maps to 0 (1 bit))
        int bytesMinusOne = bitsMinusOne / 7;
        int mask =
            -256 >> bytesMinusOne; // sequence of bytesMinusOne 1s in the most-significant bits
        pos = bytesMinusOne * 8;
        return (int) ((value >>> pos) | mask) & 0xFF;
      }
      pos -= 8;
      if (pos < 0) return END_OF_STREAM;
      return (int) (value >>> pos) & 0xFF;
    }
  }

  /**
   * Variable-length encoding for signed integers. The encoding is based on the unsigned encoding
   * above, where the first bit stored is the inverted sign, followed by as many matching bits as
   * there are additional bytes in the encoding, followed by the two's complement of the number.
   * Because of the inverted sign bit, negative numbers compare smaller than positives, and because
   * the length bits match the sign, longer positive numbers compare greater and longer negative
   * ones compare smaller.
   *
   * <p>Examples: 0 encodes as 80 1 encodes as 81 -1 encodes as 7F 63 encodes as BF 64 encodes as
   * C040 -64 encodes as 40 -65 encodes as 3FBF 2^20-1 encodes as EFFFFF 2^20 encodes as F0100000
   * -2^20 encodes as 100000 2^64-1 encodes as FFFFFFFFFFFFFFFFFF -2^64 encodes as
   * 000000000000000000
   *
   * <p>As the number of bytes is specified in bits 2-9, no value is a prefix of another.
   */
  static class VariableLengthInteger implements ByteSource {
    private final long value;
    private int pos;

    public VariableLengthInteger(long value) {
      long negativeMask = value >> 63; // -1 for negative, 0 for positive
      value ^= negativeMask;

      int bits =
          64
              - Long.numberOfLeadingZeros(
                  value | 1); // 1 to 63 (can't be 64 because we flip negative numbers)
      int bytes = bits / 7 + 1; // 0-6 bits 1 byte 7-13 2 bytes etc to 56-63 9 bytes
      if (bytes >= 9) {
        value |= 0x8000000000000000L; // 8th bit, which doesn't fit the first byte
        pos =
            negativeMask < 0
                ? 256
                : -1; // out of 0-64 range integer such that & 0xFF is 0x00 for negative and 0xFF
        // for positive
      } else {
        long mask =
            (-0x100 >> bytes) & 0xFF; // one in sign bit and as many more as there are extra bytes
        pos = bytes * 8;
        value = value | (mask << (pos - 8));
      }

      value ^= negativeMask;
      this.value = value;
    }

    @Override
    public int next() {
      if (pos <= 0 || pos > 64) {
        if (pos == 0) return END_OF_STREAM;
        else {
          // 8-byte value, returning first byte
          int result = pos & 0xFF; // 0x00 for negative numbers, 0xFF for positive
          pos = 64;
          return result;
        }
      }
      pos -= 8;
      return (int) (value >>> pos) & 0xFF;
    }
  }

  static class Number implements ByteSource {
    private final long value;
    private int pos;

    public Number(long value, int length) {
      this.value = value;
      this.pos = length;
    }

    @Override
    public int next() {
      if (pos == 0) return END_OF_STREAM;
      return (int) ((value >> (--pos * 8)) & 0xFF);
    }
  }

  /**
   * Combination of multiple byte sources. Adds {@link NEXT_COMPONENT} before sources, or {@link
   * NEXT_COMPONENT_NULL} if next is null.
   */
  static class Multi implements ByteSource {
    private final ByteSource[] srcs;
    private int srcnum = -1;
    private final int sequenceTerminator;

    Multi(ByteSource[] srcs, int sequenceTerminator) {
      this.srcs = srcs;
      this.sequenceTerminator = sequenceTerminator;
    }

    @Override
    public int next() {
      if (srcnum == srcs.length) return END_OF_STREAM;

      int b = END_OF_STREAM;
      if (srcnum >= 0 && srcs[srcnum] != null) b = srcs[srcnum].next();
      if (b > END_OF_STREAM) return b;

      ++srcnum;
      if (srcnum == srcs.length) return sequenceTerminator;
      if (srcs[srcnum] == null) return NEXT_COMPONENT_NULL;
      return NEXT_COMPONENT;
    }
  }

  /**
   * Construct the shortest common prefix of prevMax and currMin that separates those two byte
   * streams. If {@code useCurr == true} the last byte of the returned stream comes from {@code
   * currMin} and is the first byte which is greater than byte on the corresponding position of
   * {@code prevMax}. Otherwise, the last byte of the returned stream comes from {@code prevMax} and
   * is incremented by one, still guaranteeing that it is <= than the byte on the corresponding
   * position of {@code currMin}.
   */
  static class Separator implements ByteSource {
    private final ByteSource prev;
    private final ByteSource curr;
    private boolean done = false;
    private final boolean useCurr;

    Separator(ByteSource prevMax, ByteSource currMin, boolean useCurr) {
      this.prev = prevMax;
      this.curr = currMin;
      this.useCurr = useCurr;
    }

    @Override
    public int next() {
      if (done) return END_OF_STREAM;
      int p = prev.next();
      int c = curr.next();
      assert p <= c : prev + " not less than " + curr;
      if (p == c) return c;
      done = true;
      return useCurr ? c : p + 1;
    }
  }

  /**
   * A byte source of the given bytes without any encoding. The resulting source is only guaranteed
   * to give correct comparison results and be prefix-free if the underlying type has a fixed
   * length. In tests, this method is also used to generate non-escaped test cases.
   */
  public static ByteSource fixedLength(ByteBuffer b) {
    return new ByteSource() {
      int pos = b.position() - 1;

      @Override
      public int next() {
        return ++pos < b.limit() ? b.get(pos) & 0xFF : END_OF_STREAM;
      }
    };
  }

  /**
   * A byte source of the given bytes without any encoding. If used in a sequence, the resulting
   * source is only guaranteed to give correct comparison results if the underlying type has a fixed
   * length. In tests, this method is also used to generate non-escaped test cases.
   */
  public static ByteSource fixedLength(byte[] b) {
    return fixedLength(b, 0, b.length);
  }

  public static ByteSource fixedLength(byte[] b, int offset, int length) {
    checkArgument(offset >= 0 && offset <= b.length);
    checkArgument(length >= 0 && offset + length <= b.length);

    return new ByteSource() {
      int pos = offset - 1;

      @Override
      public int next() {
        return ++pos < offset + length ? b[pos] & 0xFF : END_OF_STREAM;
      }
    };
  }

  public class Peekable implements ByteSource {
    private static final int NONE = Integer.MIN_VALUE;

    private final ByteSource wrapped;
    private int peeked = NONE;

    public Peekable(ByteSource wrapped) {
      this.wrapped = wrapped;
    }

    @Override
    public int next() {
      if (peeked != NONE) {
        int val = peeked;
        peeked = NONE;
        return val;
      } else return wrapped.next();
    }

    public int peek() {
      if (peeked == NONE) peeked = wrapped.next();
      return peeked;
    }
  }

  public static Peekable peekable(ByteSource p) {
    // When given a null source, we're better off not wrapping it and just returning null. This way
    // existing
    // code that doesn't know about ByteSource.Peekable, but handles correctly null ByteSources
    // won't be thrown
    // off by a non-null instance that semantically should have been null.
    if (p == null) return null;
    return (p instanceof Peekable) ? (Peekable) p : new Peekable(p);
  }
}
